<?php
//php workermanServer.php start -d
use Workerman\Worker;
use \Workerman\MySQL\Connection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

//https://www.workerman.net/doc/workerman/components/workerman-mysql.html

//设置时区
ini_set('date.timezone','Asia/Shanghai');

// 创建一个文本协议的Worker监听2347接口
$worker = new Worker("websocket://0.0.0.0:8484");

// 只启动1个进程，这样方便客户端之间传输数据
$worker->count = 1;

//每个连接唯一id
$global_uid = 0;

/**
 * 当客户端与Workerman建立连接时(TCP三次握手完成后)触发的回调函数。
 * 每个连接只会触发一次onConnect回调。
 * @param TcpConnection $connection 连接对象，即TcpConnection实例，用于操作客户端连接，如发送数据，关闭连接等
 * @return void
 */
$worker->onConnect = function (TcpConnection $connection) {
    // 为这个连接分配一个uid
    global $global_uid;
    $connection->uid = $global_uid;

    //workerman是多进程的，每个进程内部会维护一个自增的connection id，所以多个进程之间的connection id会有重复。
    //如果想要不重复的connection id 可以根据需要给connection->id重新赋值，例如加上worker->id前缀。
    $connection->id = $global_uid ."_". $connection->id;

    $global_uid ++;
    $connection->send(json_encode([
      "type" => "uid",
      "uid"  => $connection->uid ,
    ]));
};

/**
 * 设置Worker子进程启动时的回调函数，每个子进程启动时都会执行
 * @param Worker $worker Worker对象
 * @return void
 */
$worker->onWorkerStart = function (Worker $worker) {
    // 将db实例存储在全局变量中(也可以存储在某类的静态成员中)
    global $my_conn;
    $my_conn = new Connection("127.0.0.1", '3306', 'root', '', 'chat');
};

/**
 * 设置Worker收到reload信号后执行的回调
 * @param Worker $worker orker对象
 * @return void
 */
$worker->onWorkerReload = function(Worker $worker)
{
    foreach ($worker->connections as $connection) {
        $connection->send('worker reloading');
    }
};

/**
 * 当客户端通过连接发来数据时(Workerman收到数据时)触发的回调函数
 * @param TcpConnection $connection 连接对象，即TcpConnection实例，用于操作客户端连接，如发送数据，关闭连接等
 * @param $param 客户端连接上发来的数据，如果Worker指定了协议，则是对应协议decode（解码）了的数据。
 * @return void
 */
$worker->onMessage = function(TcpConnection $connection, $param)
{
    global $my_conn;
    global $worker;
    $param = json_decode(urldecode($param), true);
    echo "\nonMessage===================>接收到的参数：\n";
    print_r($param);

    switch ($param["type"]) {
        case "login":
            $tb = "yi_user";
            if (isset($param["user_id"]) && $param["user_id"]) {
                updateUserField(["status"=>1], "id = {$param["user_id"]}");
                $info = $my_conn->select("id,username,avatar,sign,status")->from($tb)->where("id = {$param["user_id"]}")->row();
            } else {
                $info = $my_conn->select("id,username,avatar,sign,status")->from($tb)->where("username = '{$param["username"]}'")->row();
                if (!$info) {
                    $info = [
                      "username" => $param["username"],
                      "status"   => 1,
                      "avatar"   => "/upload/header/h" .$param["header"]. ".jpg"
                    ];
                    $info["id"] = $my_conn->insert($tb)->cols($info)->query();
                }
            }
            $connection->send(json_encode($info, JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE));
            break;
        case "logout":
            updateUserField(["status"=>1,"fd"=>0], "id = {$param["user_id"]}");
            $connection->send('{"type":"logout"}');
            sendFriend($param["user_id"], "syncLogout", "你的好友【{$param["username"]}】下线了");
            break;
        case "setFd":
            updateUserField(["fd"=>$param["fd"]], "id = {$param["user_id"]}");
            sendFriend($param["user_id"], "syncLogin", "你的好友【{$param["username"]}】上线了");

            //未读消息提示
            $row = $my_conn->select("id")->from("yi_system_message")->where("user_id = {$param["user_id"]}")->where("`read` = 0")->column();
            if ($row) {
                $connection->send(json_encode(["type"=>"msgBox", "total"=>count($row)]));
            }
            break;
        case "chat":
            $is_push = 0;
            $to_fd = $my_conn->select('fd')->from('yi_user')->where("id=". $param["to_id"])->where("status", 1)->single();
            if ($to_fd) {
                $param["type"] = "chat";
                sendByFd($to_fd, $param);
                $is_push = 1;
            }
            $my_conn->insert("yi_chat")->cols([
                  "user_id"    => (int)$param["user_id"],
                  "user_name"  => $param["user_name"],
                  "to_id"      => (int)$param["to_id"],
                  "to_name"    => $param["to_name"],
                  "content"    => $param["content"],
                  "create_at"  => date("Y-m-d H:i;s"),
                  "is_push"    => $is_push
            ])->query();
            break;
        case "chatGroup":
            $my_conn->insert("yi_chat_group")->cols([
              "group_id"   => (int)$param["group_id"],
              "group_name" => $param["group_name"],
              "user_id"    => (int)$param["user_id"],
              "user_name"  => $param["user_name"],
              "content"    => $param["content"],
              "create_at"  => date("Y-m-d H:i;s"),
            ])->query();

            //获取群组在线成员
            $list = $my_conn
                      ->select("yi_user.fd")
                      ->from("yi_group_member")
                      ->innerJoin("yi_user","yi_group_member.user_id=yi_user.id")
                      ->where("yi_group_member.group_id = ". (int)$param["group_id"])
                      ->where("yi_user.status = 1")
                      ->column();
            if ($list) {
                foreach ($worker->connections as $conn) {
                    if ($conn->uid != $connection->uid && in_array($conn->uid, $list)) {
                        $param["type"] = "chatGroup";
                        $conn->send(json_encode($param));
                    }
                }
            }
            break;
        case "setOnline":
            $s = $param["val"] == "online" ? 1 : 2;
            updateUserField(["status"=>$s], "fd = {$connection->uid}");
            break;
        case "setSign":
            updateUserField(["sign"=>$param["val"]], "fd = {$connection->uid}");
            break;
        case "addFriend":
            $my_conn->insert("yi_system_message")->cols([
              "user_id"   => (int)$param["to_id"],
              "from_id"   => (int)$param["from_id"],
              "group_id"  => (int)$param["gid"],
              "is_group"  => 0,
              "remark"    => $param["remark"] ?: null,
              "time"      => time()
            ])->query();

            //判断添加的好友是否在线，如果在线则发送通知
            sendByUid($param["to_id"], ["type"=> "msgBox", "total"=>1]);
            break;
        case "addGroup":
            $user = $my_conn
                      ->select("yi_user.id,yi_user.fd")
                      ->from("yi_group")
                      ->innerJoin("yi_user", "yi_user.id=yi_group.user_id")
                      ->where("yi_group.id = {$param["to_id"]}")
                      ->row();

            $my_conn->insert("yi_system_message")->cols([
              "user_id"   => (int)$user["id"],
              "from_id"   => (int)$param["from_id"],
              "group_id"  => (int)$param["to_id"],
              "is_group"  => 1,
              "remark"    => $param["remark"] ?: null,
              "time"      => time()
            ])->query();

            //判断群主是否在线，如果在线则发送通知
            sendByFd($user["fd"], ["type"=> "msgBox", "total"=>1]);
            break;
        case "addAgree":
            $where = [
              "id=" . (int)$param["id"],
              "user_id=" . (int)$param["user_id"],
              "from_id=" . (int)$param["from_id"],
              "status=0"
            ];
            $message = $my_conn->select("is_group,group_id")->from("yi_system_message")->where($where)->row();
            if ($message) {
                //更新系统信息状态为同意
                $my_conn->update("yi_system_message")->where($where)->cols(["status" => 1])->query();
                //为请求者记录结果
                $my_conn->insert("yi_system_message")->cols([
                  "user_id"  => (int)$param["from_id"],
                  "from_id"  => (int)$param["user_id"],
                  "is_group" => (int)$message["is_group"],
                  "group_id" => (int)$message["group_id"],
                  "remark"   => "同意你的请求",
                  "type"     => 1,
                  "status"   => 1,
                  "time"     => time()
                ])->query();

                //获取申请者信息
                $from = $my_conn->select("username,avatar,sign")->from("yi_user")->where("id = {$param["from_id"]}")->row();

                $return = [];
                switch ($message["is_group"]) {
                    case 0: //加好友
                        $is = $my_conn->select("id")->from("yi_friend")->where("user_id={$message["user_id"]}")->where("friend_id={$param["from_id"]}")->row();
                        if (!$is) {
                            $my_conn->insert("yi_friend")->cols([
                              "user_id"         => (int)$param["user_id"],
                              "friend_id"       => (int)$param["from_id"],
                              "friend_group_id" => (int)$param["group_id"],
                            ])->query();
                        }

                        $is = $my_conn->select("id")->from("yi_friend")->where("user_id={$message["from_id"]}")->where("friend_id={$param["user_id"]}")->row();
                        if (!$is) {
                            $my_conn->insert("yi_friend")->cols([
                              "user_id"         => (int)$param["from_id"],
                              "friend_id"       => (int)$param["user_id"],
                              "friend_group_id" => (int)$message["group_id"],
                            ])->query();

                            $return = [
                              "type"      => "friend",
                              "avatar"    => $from["avatar"],
                              "username"  => $from["username"],
                              "groupid"   => (int)$message["group_id"],
                              "id"        => (int)$param["from_id"],
                              "remark"    => $from["sign"],
                            ];
                        }
                        break;
                    case 1: //加群
                        $is = $my_conn->select("id")->from("yi_group_member")->where("group_id={$message["group_id"]}")->where("user_id={$param["from_id"]}")->row();
                        if (!$is) {
                            //把申请用户加入群主
                            $my_conn->insert("yi_group_member")->cols([
                              "group_id" => (int)$message["group_id"],
                              "user_id"  => (int)$param["from_id"]
                            ])->query();

                            //获取群信息
                            $group = $my_conn->select("groupname,COUNT(groupname) num")->from("yi_group")->where("id = {$message["group_id"]}")->row();

                            //返回信息
                            $return = [
                              "type"      => "group",
                              "avatar"    => $from["avatar"],
                              "groupname" => $group["groupname"],
                              "id"        => (int)$param["from_id"],
                              "members"   => $group["num"],
                            ];
                        }
                        break;
                }

                //如果申请人在线，同步给申请人
                sendByUid($param["from_id"], ["type"=> "addSuccess", "data"=>$return]);
            }
            break;
        case "addRefuse":
            $where = [
              "id=" . (int)$param["id"],
              "user_id=" . (int)$param["user_id"],
              "from_id=" . (int)$param["from_id"],
              "status=0"
            ];
            $message = $my_conn->from("yi_system_message")->select("is_group,group_id")->where($where)->row();

            if ($message) {
                $my_conn->update("yi_system_message")->where($where)->cols(["status" => 2])->query();
                $my_conn->insert("yi_system_message")->cols([
                  "user_id"  => (int)$param["from_id"],
                  "from_id"  => (int)$param["user_id"],
                  "is_group" => (int)$message["is_group"],
                  "group_id" => (int)$message["group_id"],
                  "remark"   => "拒绝你的请求",
                  "type"     => 1,
                  "status"   => 2,
                  "time"     => time()
                ])->query();

                //如果申请人在线，同步给申请人
                sendByUid($param["from_id"], ["type"=> "msgBox", "total"=>1]);
            }
            break;
    }
};

/**
 * 当客户端连接与Workerman断开时触发的回调函数。
 * 不管连接是如何断开的，只要断开就会触发onClose。每个连接只会触发一次onClose。
 * @param TcpConnection $connection 连接对象，即TcpConnection实例，用于操作客户端连接，如发送数据，关闭连接等
 * @return void
 */
$worker->onClose = function(TcpConnection $connection)
{
    global $my_conn;
    $user = $my_conn->select("id,username")->from("yi_user")->where("fd = ". $connection->uid)->row();
    if ($user) {
        sendFriend($user["id"], "syncLogout", "你的好友【{$user["username"]}】下线了");
    }
};

function sendFriend ($id, $type, $msg) {
     global $my_conn;
     $friend = $my_conn
              ->select("yi_user.fd")
              ->from("yi_friend")
              ->innerJoin("yi_user", "yi_user.id=yi_friend.friend_id")
              ->where("yi_friend.user_id = {$id}")
              ->where("yi_friend.status = 0")
              ->where("yi_user.status > 0")
              ->column();
     if ($friend) {
         sendByFd($friend, ["type" => $type,"msg"  => $msg]);
     }
}

function updateUserField ($cols, $where) {
    global $my_conn;
    $my_conn->update("yi_user")->cols($cols)->where($where)->query();
}

function sendByFd($fd, $msg) {
    if ($fd) {
        global $worker;
        if (is_array($msg)) {
            $msg = json_encode($msg,JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE);
        }
        foreach ($worker->connections as $conn) {
            if (is_array($fd) && in_array($conn->uid, $fd)) {
                $conn->send($msg);
            } elseif ($conn->uid == $fd) {
                $conn->send($msg);
            }
        }
    }
}

function sendByUid($uid, $msg) {
    global $my_conn;
    $fd = $my_conn->select("fd")->from("yi_user")->where("id = {$uid}")->where("status > 0")->single();
    sendByFd($fd, $msg);
}

// 运行worker
Worker::runAll();